Architecture
We talked about spark jobs in chapter 3. In this chapter, we will talk about the architecture and how master, worker, driver and executors are coordinated to finish a job.
Feel free to skip code if you prefer diagrams.
Deployment diagram
We have seen the following diagram inoverview
chapter.
Next, we will talk about some details about it.
Job submission
The diagram below illustrates how driver program (on master node) produces job, and then submits it to worker nodes.
Driver side behavior is equivalent to the code below:
finalRDD.action()
=
>
sc.runJob()
//
generate job, stages and tasks
=
>
dagScheduler.runJob()
=
>
dagScheduler.submitJob()
=
>
dagSchedulerEventProcessActor
!
JobSubmitted
=
>
dagSchedulerEventProcessActor.
JobSubmitted
()
=
>
dagScheduler.handleJobSubmitted()
=
>
finalStage
=
newStage()
=
>
mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size)
=
>
dagScheduler.submitStage()
=
>
missingStages
=
dagScheduler.getMissingParentStages()
=
>
dagScheduler.subMissingTasks(readyStage)
//
add tasks to the taskScheduler
=
>
taskScheduler.submitTasks(
new
TaskSet
(tasks))
=
>
fifoSchedulableBuilder.addTaskSetManager(taskSet)
//
send tasks
=
>
sparkDeploySchedulerBackend.reviveOffers()
=
>
driverActor
!
ReviveOffers
=
>
sparkDeploySchedulerBackend.makeOffers()
=
>
sparkDeploySchedulerBackend.launchTasks()
=
>
foreach task
CoarseGrainedExecutorBackend
(executorId)
!
LaunchTask
(serializedTask)
Explanation:
When the following code is evaluated, the program will launch a bunch of driver communications, e.g. job's executors, threads, actors, etc.
val
sc
=
new
SparkContext
(sparkConf)
This line defines the role of driver
Job logical plan
transformation()
in driver program builds a computing chain (a series ofRDD
). In eachRDD
:
compute()
function defines the computation of records for its partitionsgetDependencies()
function defines the dependency relationship across RDD partitions.
Job physical plan
Eachaction()
triggers a job:
- During
dagScheduler.runJob()
, different stages are defined - During
submitStage()
,ResultTasks
andShuffleMapTasks
needed by the stage are produced, then they are packaged inTaskSet
and sent toTaskScheduler
. IfTaskSet
can be executed, tasks will be submitted tosparkDeploySchedulerBackend
which will distribute tasks.
Task distribution
AftersparkDeploySchedulerBackend
getsTaskSet
, theDriver Actor
sends serialized tasks toCoarseGrainedExecutorBackend Actor
on worker node.
Job reception
After receiving tasks, worker will do the following things:
coarseGrainedExecutorBackend
!
LaunchTask
(serializedTask)
=
>
executor.launchTask()
=
>
executor.threadPool.execute(
new
TaskRunner
(taskId, serializedTask))
Executor packages each task intotaskRunner
, and picks a free thread to run the task. ACoarseGrainedExecutorBackend
process has exactly one executor
Task execution
The diagram below shows the execution of a task received by worker node and how driver processes task results.
After receiving a serialized task, the executor deserializes it into a normal task, and then runs the task to getdirectResult
which will be sent back to driver. It is noteworthy that data package sent fromActor
can not be too big:
- If the result is too big (e.g. the one of
groupByKey
), it will be persisted to "memory + hard disk" and managed byblockManager
. Driver will only getindirectResult
containing the storage location. When result is needed, driver will fetch it via HTTP. - If the result is not too big (less than
spark.akka.frameSize = 10MB
), then it will be directly sent to driver.
Some more details aboutblockManager
:
WhendirectResult > akka.frameSize
, thememoryStore
ofBlockManager
creates aLinkedHashMap
to hold the data stored in memory whose size should be less thanRuntime.getRuntime.maxMemory * spark.storage.memoryFraction(default 0.6)
. IfLinkedHashMap
has no space to save the incoming data, these data will be sent todiskStore
which persists data to hard disk if the datastorageLevel
contains "disk"
In
TaskRunner
.run()
//
deserialize task, run it and then send the result to
=
>
coarseGrainedExecutorBackend.statusUpdate()
=
>
task
=
ser.deserialize(serializedTask)
=
>
value
=
task.run(taskId)
=
>
directResult
=
new
DirectTaskResult
(ser.serialize(value))
=
>
if
( directResult.size()
>
akkaFrameSize() )
indirectResult
=
blockManager.putBytes(taskId, directResult,
MEMORY
+
DISK
+
SER
)
else
return
directResult
=
>
coarseGrainedExecutorBackend.statusUpdate(result)
=
>
driver
!
StatusUpdate
(executorId, taskId, result)
The results produced byShuffleMapTask
andResultTask
are different.
ShuffleMapTask
producesMapStatus
containing 2 parts:- the
BlockManagerId
of the task'sBlockManager
: (executorId + host, port, nettyPort) - the size of each output
FileSegment
of a task
- the
ResultTask
produces the execution result of the specifiedfunction
on one partition e.g. Thefunction
ofcount()
is simply for counting the number of records in a partition. SinceShuffleMapTask
needsFileSegment
for writing to disk,OutputStream
writers are needed. These writers are produced and managed byblockManger
ofshuffleBlockManager
In
task.run(taskId)
//
if the task is ShuffleMapTask
=
>
shuffleMapTask.runTask(context)
=
>
shuffleWriterGroup
=
shuffleBlockManager.forMapTask(shuffleId, partitionId, numOutputSplits)
=
>
shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split, context))
=
>
return
MapStatus
(blockManager.blockManagerId,
Array
[compressedSize(fileSegment)])
//
If the task is ResultTask
=
>
return
func(context, rdd.iterator(split, context))
A series of operations will be executed after driver gets a task's result:
TaskScheduler
will be notified that the task is finished, and its result will be processed:
- If it is
indirectResult
,BlockManager.getRemotedBytes()
will be invoked to fetch actual results.- If it is
ResultTask
,ResultHandler()
invokes driver side computation on result (e.g.count()
takesum
operation on all ResultTask). - If it is
MapStatus
ofShuffleMapTask
, thenMapStatus
will be put intomapStatuses
ofmapOutputTrackerMaster
, which makes it more easy to be queried during reduce shuffle.
- If it is
- If the received task on driver is the last task in the stage, then next stage will be submitted. If the stage is already the last one,
dagScheduler
will be informed that the job is finished.
After
driver receives
StatusUpdate
(result)
=
>
taskScheduler.statusUpdate(taskId, state, result.value)
=
>
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result)
=
>
if
result is
IndirectResult
serializedTaskResult
=
blockManager.getRemoteBytes(
IndirectResult
.blockId)
=
>
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
=
>
taskSetManager.handleSuccessfulTask(tid, taskResult)
=
>
dagScheduler.taskEnded(result.value, result.accumUpdates)
=
>
dagSchedulerEventProcessActor
!
CompletionEvent
(result, accumUpdates)
=
>
dagScheduler.handleTaskCompletion(completion)
=
>
Accumulators
.add(event.accumUpdates)
//
If the finished task is ResultTask
=
>
if
(job.numFinished
==
job.numPartitions)
listenerBus.post(
SparkListenerJobEnd
(job.jobId,
JobSucceeded
))
=
>
job.listener.taskSucceeded(outputId, result)
=
>
jobWaiter.taskSucceeded(index, result)
=
>
resultHandler(index, result)
//
If the finished task is ShuffleMapTask
=
>
stage.addOutputLoc(smt.partitionId, status)
=
>
if
(all tasks in current stage have finished)
mapOutputTrackerMaster.registerMapOutputs(shuffleId,
Array
[
MapStatus
])
mapStatuses.put(shuffleId,
Array
[
MapStatus
]()
++
statuses)
=
>
submitStage(stage)
Shuffle read
In the preceding paragraph, we talked about task execution and result processing, now we will talk about how reducer (tasks needs shuffle) gets the input data. The shuffle read part in last chapter has already talked about how reducer processes input data.
How does reducer know where to fetch data ?
Reducer needs to know on which node theFileSegments
produced byShuffleMapTask
of parent stage are.This kind of information is sent to driver’smapOutputTrackerMaster
whenShuffleMapTask
is finished. The information is also stored inmapStatuses: HashMap[stageId, Array[MapStatus]]
. GivenstageId
, we can getArray[MapStatus]
which contains information aboutFileSegments
produced byShuffleMapTasks
.Array(taskId)
contains the location(blockManagerId
) and the size of eachFileSegment
.
When reducer need fetch input data, it will first invokeblockStoreShuffleFetcher
to get input data’s location (FileSegments
).blockStoreShuffleFetcher
calls localMapOutputTrackerWorker
to do the work.MapOutputTrackerWorker
usesmapOutputTrackerMasterActorRef
to communicate withmapOutputTrackerMasterActor
in order to getMapStatus
.blockStoreShuffleFetcher
processesMapStatus
and finds out where reducer should fetchFileSegment
information, and then it stores this information inblocksByAddress
.blockStoreShuffleFetcher
tellsbasicBlockFetcherIterator
to fetchFileSegment
data.
rdd.iterator()
=
>
rdd(e.g.,
ShuffledRDD
/
CoGroupedRDD
).compute()
=
>
SparkEnv
.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser)
=
>
blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer)
=
>
statuses
=
MapOutputTrackerWorker
.getServerStatuses(shuffleId, reduceId)
=
>
blocksByAddress
:
Seq
[(
BlockManagerId
,
Seq
[(
BlockId
,
Long
)])]
=
compute(statuses)
=
>
basicBlockFetcherIterator
=
blockManager.getMultiple(blocksByAddress, serializer)
=
>
itr
=
basicBlockFetcherIterator.flatMap(unpackBlock)
AfterbasicBlockFetcherIterator
has received the task of data retrieving, it produces severalfetchRequest
s. *_Each of them contains the tasks to fetchFileSegment
s from several nodes. *_According to the diagram above, we know thatreducer-2
needs to fetchFileSegment
(FS)(in white) from 3 worker nodes. The global data fetching task can be represented byblockByAddress
: 4 blocks from node 1, 3 blocks from node 2, and 4 blocks from node 3
In order to accelerate data fetching process, it makes sense to divide the global tasks into sub tasks(fetchRequest
), then every task takes a thread to fetch data. Spark launches 5 parallel threads for each reducer (the same as Hadoop). Since the fetched data will be buffered into memory, one fetch is not able to take too much data (no more thanspark.reducer.maxMbInFlight=48MB
).Note that48MB
is shared by the 5 fetch threads,so each sub task should take no more than48MB / 5 = 9.6MB
. In the diagram, on node 1, we havesize(FS0-2) + size(FS1-2) < 9.6MB, but size(FS0-2) + size(FS1-2) + size(FS2-2) > 9.6MB
, so we should break betweent1-r2
andt2-r2
. As a result, we can see 2fetchRequest
s fetching data from node 1.Will there befetchRequest
larger than 9.6MB?The answer is yes. If oneFileSegment
is too large, it still needs to be fetched in one shot. Besides, if reducer needs someFileSegment
s already existing on the local, it will do local read. At the end of shuffle read, it will deserialize fetchedFileSegment
and offer record iterators toRDD.compute()
In
basicBlockFetcherIterator
:
//
generate the fetch requests
=
>
basicBlockFetcherIterator.initialize()
=
>
remoteRequests
=
splitLocalRemoteBlocks()
=
>
fetchRequests
++
=
Utils
.randomize(remoteRequests)
//
fetch remote blocks
=
>
sendRequest(fetchRequests.dequeue()) until
Size
(fetchRequests)
>
maxBytesInFlight
=
>
blockManager.connectionManager.sendMessageReliably(cmId,
blockMessageArray.toBufferMessage)
=
>
fetchResults.put(
new
FetchResult
(blockId, sizeMap(blockId)))
=
>
dataDeserialize(blockId, blockMessage.getData, serializer)
//
fetch local blocks
=
>
getLocalBlocks()
=
>
fetchResults.put(
new
FetchResult
(id,
0
, ()
=
>
iter))
Some more details:
How does the reducer sendfetchRequest
to the target node? How does the target node processfetchRequest
, read and send backFileSegment
to reducer?
WhenRDD.iterator()
meetsShuffleDependency
,BasicBlockFetcherIterator
will be called to fetchFileSegment
s.BasicBlockFetcherIterator
usesconnectionManager
ofblockManger
to sendfetchRequest
toconnectionManager
s on the other nodes. NIO is used for communication betweenconnectionManager
s. On the other nodes, for example, afterconnectionManager
on worker node 2 receives a message, it will forward the message toblockManager
. The latter usesdiskStore
to readFileSegment
s requested byfetchRequest
locally, they will still be sent back byconnectionManager
. IfFileConsolidation
is activated,diskStore
needs the location ofblockId
given byshuffleBolockManager
. IfFileSegment
is no more thanspark.storage.memoryMapThreshold = 8KB
, then diskStore will putFileSegment
into memory when reading it, otherwise, The memory mapping method inFileChannel
ofRandomAccessFile
will be used to readFileSegment
, thus largeFileSegment
can be loaded into memory.
WhenBasicBlockFetcherIterator
receives serializedFileSegments
from the other nodes, it will deserialize and put them infetchResults.Queue
. You may notice thatfetchResults.Queue
is similar tosoftBuffer
inShuffle detials
chapter.If theFileSegment
s needed byBasicBlockFetcherIterator
are local, they will be found locally bydiskStore
, and put infetchResults
. Finally, reducer reads the records fromFileSegment
and processes them.
After
the blockManager receives the fetch request
=
>
connectionManager.receiveMessage(bufferMessage)
=
>
handleMessage(connectionManagerId, message, connection)
//
invoke blockManagerWorker to read the block (FileSegment)
=
>
blockManagerWorker.onBlockMessageReceive()
=
>
blockManagerWorker.processBlockMessage(blockMessage)
=
>
buffer
=
blockManager.getLocalBytes(blockId)
=
>
buffer
=
diskStore.getBytes(blockId)
=
>
fileSegment
=
diskManager.getBlockLocation(blockId)
=
>
shuffleManager.getBlockLocation()
=
>
if
(fileSegment
<
minMemoryMapBytes)
buffer
=
ByteBuffer
.allocate(fileSegment)
else
channel.map(
MapMode
.
READ_ONLY
, segment.offset, segment.length)
Every reducer has aBasicBlockFetcherIterator
, and oneBasicBlockFetcherIterator
could, in theory, hold 48MB offetchResults
. As soon as oneFileSegment
infetchResults
is read off, someFileSegment
s will be fetched to fill that 48MB.
BasicBlockFetcherIterator
.next()
=
>
result
=
results.task()
=
>
while
(
!
fetchRequests.isEmpty
&
&
(bytesInFlight
==
0
||
bytesInFlight
+
fetchRequests.front.size
<
=
maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
=
>
result.deserialize()
Discussion
In terms of architecture design, functionalities and modules are pretty independent.BlockManager
is well designed, but it seems to manage too many things (data block, memory, disk and network communication)
This chapter discussed how the modules of spark system are coordinated to finish a job (production, submission, execution, results collection, results computation and shuffle). A lot of code is pasted, many diagrams are drawn. More details can be found in source code, if you want.
If you also want to know more aboutblockManager
, please refer to Jerry Shao'sblog(in Chinese).